外观
1. Logstash 基础概念
Logstash 是 ELK Stack 中的数据处理管道工具,负责收集、转换和转发数据。
1.1 Logstash 工作原理
1.1.1 数据管道架构
管道阶段说明
- Input: 从各种来源收集数据
- Filter: 解析、转换和丰富数据
- Output: 将处理后的数据发送到目的地
1.1.2 事件处理流程
每个通过 Logstash 的数据都被封装为**事件(Event)**对象:
json
{
"@timestamp": "2024-01-01T12:00:00.000Z",
"@metadata": {
"version": "1"
},
"message": "原始日志消息",
"host": "server01",
"path": "/var/log/app.log"
}1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
- @timestamp: 事件时间戳
- @metadata: 元数据信息(不会被输出)
- message: 原始消息内容
- 其他字段: 处理过程中添加的字段
1.2 安装和配置
1.2.1 基础配置
创建配置文件 logstash.conf:
ruby
input {
# 输入插件配置
}
filter {
# 过滤插件配置
}
output {
# 输出插件配置
}1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
1.2.2 运行模式
bash
# 前台运行(开发调试)
./bin/logstash -f config/logstash.conf
# 后台运行
./bin/logstash -f config/logstash.conf --log.level=info &
# 指定配置文件目录
./bin/logstash --path.config=/etc/logstash/conf.d
# 验证配置语法
./bin/logstash -f config/logstash.conf --config.test_and_exit1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
配置验证
总是使用 --config.test_and_exit 参数验证配置语法,避免运行时错误。
2. 输入插件 (Input Plugins)
2.1 文件输入插件
2.1.1 基础文件监控
ruby
input {
file {
path => "/var/log/*.log"
start_position => "beginning"
sincedb_path => "/var/lib/logstash/.sincedb"
ignore_older => 86400
}
}1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
2.1.2 多路径配置
ruby
input {
file {
path => [
"/var/log/nginx/*.log",
"/var/log/apache2/*.log",
"/opt/app/logs/*.log"
]
exclude => "*.gz"
type => "web_logs"
tags => ["web", "production"]
}
}1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
文件监控要点
- sincedb_path: 记录读取位置,确保重启后不重复读取
- start_position: "beginning" 从头读取,"end" 从尾部开始
- ignore_older: 忽略超过指定秒数的旧文件
2.1.3 日志轮转处理
ruby
input {
file {
path => "/var/log/app.log"
file_completed_action => "log"
file_completed_log_path => "/var/log/logstash/completed.log"
}
}1
2
3
4
5
6
7
2
3
4
5
6
7
2.2 网络输入插件
2.2.1 TCP/UDP 输入
ruby
input {
tcp {
port => 5140
type => "syslog"
codec => "json"
}
udp {
port => 514
buffer_size => 8192
workers => 4
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
2.2.2 HTTP 输入
ruby
input {
http {
port => 8080
response_headers => {
"Access-Control-Allow-Origin" => "*"
"Access-Control-Allow-Methods" => "GET, POST"
}
additional_codecs => {
"application/json" => "json"
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
2.3 消息队列输入
2.3.1 Redis 输入
ruby
input {
redis {
host => "127.0.0.1"
port => 6379
key => "logstash:events"
data_type => "list"
codec => "json"
}
}1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
2.3.2 Kafka 输入
ruby
input {
kafka {
bootstrap_servers => "kafka01:9092,kafka02:9092"
topics => ["app-logs", "system-logs"]
group_id => "logstash-consumer"
consumer_threads => 3
codec => "json"
auto_offset_reset => "earliest"
}
}1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
2.4 Beats 输入
2.4.1 Filebeat 输入
ruby
input {
beats {
port => 5044
ssl => false
congestion_threshold => 5
}
}1
2
3
4
5
6
7
2
3
4
5
6
7
输入插件选择策略
- 小规模: 使用 file 和 tcp 插件
- 大规模: 结合 beats + redis/kafka 做缓冲
- 高可用: 配置多个输入源和负载均衡
3. 过滤插件 (Filter Plugins)
3.1 Grok 模式匹配
3.1.1 Grok 基础语法
Grok 是 Logstash 最强大的解析工具,使用正则表达式和预定义模式:
ruby
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{DATA:logger} - %{GREEDYDATA:message}"
}
}
}1
2
3
4
5
6
7
2
3
4
5
6
7
3.1.2 常用 Grok 模式
| 模式 | 描述 | 示例 |
|---|---|---|
%{IP:client_ip} | IP 地址 | 192.168.1.1 |
%{HOSTNAME:host} | 主机名 | web-server-01 |
%{TIMESTAMP_ISO8601:timestamp} | ISO 时间戳 | 2024-01-01T12:00:00Z |
%{LOGLEVEL:level} | 日志级别 | INFO, ERROR, DEBUG |
%{NUMBER:number} | 数字 | 123, 45.67 |
%{WORD:word} | 单词 | application |
%{DATA:data} | 任意数据 | any string |
%{GREEDYDATA:greedy} | 贪婪匹配 | 剩余所有内容 |
3.1.3 复杂日志解析
ruby
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} %{IP:client} %{WORD:method} %{URIPATH:uri} %{NUMBER:status} %{NUMBER:bytes} %{QUOTEDSTRING:referrer} %{QUOTEDSTRING:user_agent}"
}
patterns_dir => ["/etc/logstash/patterns"]
}
}1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
Grok 性能优化
- 避免过度复杂的正则表达式
- 使用预编译模式文件
- 设置超时时间防止阻塞
3.1.4 自定义模式文件
创建 /etc/logstash/patterns/custom.patterns:
CUSTOM_TIMESTAMP %{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}
CUSTOM_LOG_LEVEL (TRACE|DEBUG|INFO|WARN|ERROR|FATAL)
CUSTOM_APP_NAME [A-Za-z0-9_-]+1
2
3
2
3
在配置中使用:
ruby
filter {
grok {
patterns_dir => ["/etc/logstash/patterns"]
match => {
"message" => "%{CUSTOM_TIMESTAMP:timestamp} %{CUSTOM_LOG_LEVEL:level} %{CUSTOM_APP_NAME:app} %{GREEDYDATA:content}"
}
}
}1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
3.2 数据转换过滤器
3.2.1 Mutate 过滤器
ruby
filter {
mutate {
# 字段重命名
rename => {
"old_field" => "new_field"
}
# 字段转换
convert => {
"status_code" => "integer"
"response_time" => "float"
"is_active" => "boolean"
}
# 字符串操作
uppercase => ["field1", "field2"]
lowercase => ["field3"]
# 字段分割
split => {
"tags" => ","
}
# 字段合并
join => {
"tags" => ","
}
# 字段移除
remove_field => ["unwanted_field"]
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
3.2.2 Date 过滤器
ruby
filter {
date {
match => [
"timestamp", "ISO8601",
"log_date", "yyyy-MM-dd HH:mm:ss"
]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
}1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
3.2.3 GeoIP 过滤器
ruby
filter {
geoip {
source => "client_ip"
target => "geo"
database => "/path/to/GeoLite2-City.mmdb"
add_field => {
"location" => "%{[geo][location]}"
}
}
}1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
3.3 数据丰富过滤器
3.3.1 DNS 过滤器
ruby
filter {
dns {
reverse => ["client_ip"]
action => "replace"
nameserver => ["8.8.8.8", "8.8.4.4"]
}
}1
2
3
4
5
6
7
2
3
4
5
6
7
3.3.2 UserAgent 过滤器
ruby
filter {
useragent {
source => "user_agent"
target => "useragent"
}
}1
2
3
4
5
6
2
3
4
5
6
3.3.3 Translate 过滤器
ruby
filter {
translate {
field => "status_code"
destination => "status_text"
dictionary => {
"200" => "OK"
"404" => "Not Found"
"500" => "Internal Server Error"
}
fallback => "Unknown Status"
}
}1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
3.4 条件处理
3.4.1 条件过滤
ruby
filter {
if [type] == "apache" {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
} else if [type] == "nginx" {
grok {
match => {
"message" => "%{NGINXACCESS}"
}
}
} else {
drop {}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
3.4.2 多条件判断
ruby
filter {
if [status] >= 400 and [status] < 500 {
mutate {
add_tag => ["client_error"]
}
} else if [status] >= 500 {
mutate {
add_tag => ["server_error"]
add_field => { "alert" => true }
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
4. 输出插件 (Output Plugins)
4.1 Elasticsearch 输出
4.1.1 基础配置
ruby
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "logstash-%{+YYYY.MM.dd}"
document_type => "_doc"
template_name => "logstash"
template_overwrite => true
}
}1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
4.1.2 高级配置
ruby
output {
elasticsearch {
hosts => ["es01:9200", "es02:9200", "es03:9200"]
index => "%{[@metadata][index]}-%{+YYYY.MM.dd}"
# 连接配置
ssl => true
cacert => "/etc/ssl/certs/ca.pem"
# 性能配置
workers => 4
batch_size => 1000
flush_size => 5000
# 重试配置
retry_on_conflict => 3
retry_max_interval => 60
# 模板配置
manage_template => true
template => "/etc/logstash/templates/logstash.json"
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
4.2 文件输出
ruby
output {
file {
path => "/var/log/logstash/output-%{+YYYY-MM-dd}.log"
codec => "json_lines"
gzip => true
}
}1
2
3
4
5
6
7
2
3
4
5
6
7
4.3 消息队列输出
4.3.1 Redis 输出
ruby
output {
redis {
host => "127.0.0.1"
port => 6379
key => "logstash:events"
data_type => "list"
batch_events => 50
}
}1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
4.3.2 Kafka 输出
ruby
output {
kafka {
bootstrap_servers => "kafka01:9092,kafka02:9092"
topic_id => "logstash-events"
compression_type => "gzip"
acks => "1"
retries => 3
}
}1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
4.4 条件输出
ruby
output {
if [type] == "error" {
elasticsearch {
hosts => ["error-es:9200"]
index => "errors-%{+YYYY.MM.dd}"
}
email {
to => "[email protected]"
subject => "Error Alert: %{message}"
}
}
# 默认输出
elasticsearch {
hosts => ["localhost:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
5. 编解码器 (Codecs)
5.1 常用编解码器
5.1.1 JSON 编解码器
ruby
input {
tcp {
port => 9999
codec => json
}
}
output {
file {
path => "/var/log/output.log"
codec => json_lines
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
5.1.2 多行编解码器
ruby
input {
file {
path => "/var/log/java_app.log"
codec => multiline {
pattern => "^%{TIMESTAMP_ISO8601}"
negate => true
what => "previous"
}
}
}1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
5.1.3 自定义编解码器
ruby
input {
tcp {
port => 9999
codec => json {
charset => "UTF-8"
}
}
}1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
6. 配置管理和优化
6.1 配置文件组织
6.1.1 目录结构
/etc/logstash/
├── conf.d/
│ ├── 01-input.conf
│ ├── 10-filter.conf
│ └── 99-output.conf
├── patterns/
│ └── custom.patterns
└── templates/
└── logstash.json1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
6.1.2 配置拆分
ruby
# 01-input.conf
input {
beats { port => 5044 }
tcp { port => 5140 }
}
# 10-filter.conf
filter {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{WORD:program} %{GREEDYDATA:content}" }
}
date {
match => [ "timestamp", "MMM dd HH:mm:ss" ]
}
}
# 99-output.conf
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "logstash-%{+YYYY.MM.dd}"
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
6.2 性能优化
6.2.1 JVM 调优
bash
# jvm.options
-Xms2g
-Xmx2g
-XX:+UseG1GC
-XX:MaxGCPauseMillis=2001
2
3
4
5
2
3
4
5
6.2.2 管道配置
ruby
# logstash.yml
pipeline:
workers: 4
batch:
size: 125
delay: 5
unsafe_shutdown: false
queue:
type: persisted
path: "/var/lib/logstash/queue"
max_bytes: 1024mb1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
6.2.3 监控和调试
ruby
filter {
ruby {
code => "
event.set('processing_time', (Time.now.to_f * 1000).to_i)
"
}
}
output {
stdout {
codec => rubydebug
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
性能优化清单
- [ ] 调整 pipeline workers 数量
- [ ] 配置适当的 batch size
- [ ] 使用持久化队列防止数据丢失
- [ ] 监控 JVM 内存和 GC 情况
- [ ] 定期清理过期数据
6.3 故障排查
6.3.1 常见问题
| 问题 | 症状 | 解决方案 |
|---|---|---|
| 配置语法错误 | 启动失败 | 使用 --config.test_and_exit 验证 |
| 内存溢出 | OOM 错误 | 增加 JVM 堆内存,优化过滤器 |
| 数据丢失 | 队列积压 | 配置持久化队列,增加 workers |
| 性能瓶颈 | 处理延迟 | 调整 batch size,优化 grok 模式 |
6.3.2 调试技巧
ruby
# 添加调试输出
output {
stdout {
codec => rubydebug
}
}
# 条件调试
if "_grokparsefailure" in [tags] {
file {
path => "/var/log/logstash/grok_failures.log"
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
生产环境注意事项
- 移除 stdout 输出插件避免性能影响
- 配置适当的日志轮转策略
- 设置监控告警规则
- 定期备份配置文件和模式文件
7. 实际应用案例
7.1 Nginx 日志处理
ruby
input {
file {
path => "/var/log/nginx/access.log"
start_position => "beginning"
}
}
filter {
grok {
match => {
"message" => '%{IP:client} - - \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATH:request} HTTP/%{NUMBER:http_version}" %{NUMBER:status} %{NUMBER:bytes} "%{DATA:referrer}" "%{DATA:user_agent}"'
}
}
date {
match => [ "timestamp", "dd/MMM/YYYY:HH:mm:ss Z" ]
target => "@timestamp"
}
geoip {
source => "client"
}
useragent {
source => "user_agent"
target => "ua"
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "nginx-%{+YYYY.MM.dd}"
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
7.2 多行 Java 异常处理
ruby
input {
file {
path => "/var/log/app/*.log"
codec => multiline {
pattern => "^%{TIMESTAMP_ISO8601}"
negate => true
what => "previous"
auto_flush_interval => 2
}
}
}
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{DATA:class} - %{GREEDYDATA:content}"
}
}
if [level] == "ERROR" or [level] == "FATAL" {
mutate {
add_tag => ["error"]
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "java-app-%{+YYYY.MM.dd}"
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
最佳实践建议
- 根据日志格式选择合适的输入插件
- 使用 grok 进行精确的字段提取
- 添加适当的数据转换和丰富
- 配置合理的索引轮转策略
- 设置监控和告警机制